package com.proxy.server.backend.bio;

import com.proxy.common.constant.CapabilitiesType;
import com.proxy.common.constant.PacketType;
import com.proxy.common.exception.ProxyIoException;
import com.proxy.common.packet.*;
import com.proxy.common.utils.CharsetUtil;
import com.proxy.common.utils.SecurityUtil;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by liufish on 16/7/27.
 */
public class BioChannel {

    private BioBackendClient client;

    private Socket socket;

    private AtomicBoolean  isHeartBeating = new AtomicBoolean(false);

    /**
     * 上一次活动的时间,用于心跳控制。
     */
    private AtomicLong lastActiveTime = new AtomicLong(System.currentTimeMillis());

    public BioChannel(BioBackendClient client){
        this.client = client;
    }

    public boolean connect() {
        try {
            this.socket = new Socket();
            this.socket.setReuseAddress(true);
            this.socket.setKeepAlive(true);
            this.socket.setTcpNoDelay(true);
            this.socket.setSoLinger(true,0);
            this.socket.setSoTimeout(1000);
            this.socket.connect(new InetSocketAddress(client.getNode().getHost(),client.getNode().getPort()),1000);

            //FutureTask
            BinaryPacket bin = new BinaryPacket(socket.getInputStream());

            HandshakePacket handshakePacket = new HandshakePacket();
            handshakePacket.read(bin);

            //发送登录用户名密码数据库验证信息
            AuthPacket authPacket = new AuthPacket();
            authPacket.packetId = ++ handshakePacket.packetId;
            authPacket.clientFlags = CapabilitiesType.getServerCapabilities();
            authPacket.maxPacketSize =  1024*1024*16 ; //16M
            authPacket.charsetIndex = handshakePacket.serverCharsetIndex;
            authPacket.user = (client.getNode().getUser());
            final byte[] seed = handshakePacket.seed;
            int charsetIndex = handshakePacket.serverCharsetIndex;
            byte[] password = client.getNode().getPassword().getBytes(Charset.forName(CharsetUtil.getCharset(charsetIndex)));
            byte[] restOfScramble = handshakePacket.restOfScrambleBuff;
            byte[] authSeed = new byte[seed.length + restOfScramble.length];
            System.arraycopy(seed, 0, authSeed, 0, seed.length);
            System.arraycopy(restOfScramble, 0, authSeed, seed.length, restOfScramble.length);
            authPacket.password = SecurityUtil.scramble411(password, authSeed);
            authPacket.database = client.getNode().getSchema();
            authPacket.write(socket.getOutputStream());

            //接收包
            bin = new BinaryPacket(socket.getInputStream());
            //记录活动时间
            lastActiveTime.set(System.currentTimeMillis());
            switch (bin.body[0]){
                case  0x00:
                    //success
                    return true;
                case (byte)0xff :
                    //fail
                    throw new ProxyIoException(String.format("user{%s}/password/db{%s} error",authPacket.user,authPacket.database));
                case (byte) 0xfe:
                    // 发送323响应认证数据包
                    Reply323Packet reply323Packet = new Reply323Packet();
                    reply323Packet.read(bin);

                    if (client.getNode().getPassword() != null && client.getNode().getPassword().length() > 0) {
                        byte[] seed323 = SecurityUtil.scramble323(client.getNode().getPassword(), new String(seed)).getBytes();
                        reply323Packet.seed = seed323;
                    }
                    reply323Packet.write(socket.getOutputStream());
                    //继续接收半包
                    break;
            }

            //如果是半包,继续接收
            bin = new BinaryPacket(socket.getInputStream());
            switch (bin.body[0]) {
                case 0x00:
                    //success
                    return true;
                case (byte) 0xff:
                    //fail
                    throw new ProxyIoException(String.format("user{%s}/password/db{%s} error",authPacket.user,authPacket.database));
            }

        }catch (Exception ex){
            throw new ProxyIoException(ex);
        }
        return false;
    }

    public BinaryPacket read(){
        boolean successRead = true;
        try {
            while (isHeartBeating.get()){
                //防止cpu暴涨
                Thread.sleep(4);
            }
            return new BinaryPacket(this.socket.getInputStream());
        }catch (Exception ex){
            successRead = false;
            throw new ProxyIoException(ex);
        }finally {
            //记录活动时间
            if(successRead){
                lastActiveTime.set(System.currentTimeMillis());
            }
        }
    }


    public boolean isActive(){
        boolean closed = socket.isClosed();
        boolean connected = socket.isConnected();
        boolean outputShutdown = socket.isOutputShutdown();
        boolean inputShutdown = socket.isInputShutdown();
        return connected && !closed && !inputShutdown && !outputShutdown;
    }

    public void close(){
        if(!socket.isClosed()){
            try {
                socket.getOutputStream().flush();
                socket.close();
            } catch (IOException e) {
                throw new ProxyIoException(e);
            }
        }
    }

    public OutputStream getOutputStream() {
        try {
            while (isHeartBeating.get()){
                //防止cpu暴涨
                Thread.sleep(4);
            }
            return socket.getOutputStream();
        }catch (Exception ex){
            throw new ProxyIoException(ex);
        }
    }

    /**
     * 需要判断上一次心跳,或者执行时间来判断是否要执行心跳。
     */
    public void heartbeat(){
        try {
            //如果 当前时间 - 上一次时间 < 心跳间隔时间
            if(System.currentTimeMillis() - lastActiveTime.get() < client.getNode().getHeartbeatTime()  ){
                return;
            }
            BinaryPacket bin = null;
            isHeartBeating.set(true);
            //update xdual set x=now()/select 1 from dual
            CommandPacket packet = new CommandPacket(0, PacketType.COM_QUERY,"UPDATE XDUAL SET X=NOW()".getBytes());
            packet.write(socket.getOutputStream());

            lastActiveTime.set(System.currentTimeMillis());


            bin = new BinaryPacket(this.socket.getInputStream());
            if(bin.body[0] == (byte)0x00){
                //success
                return;
            }
            if(bin.body[0] == (byte)0xff){
                // error
                return;
            }
            ////bin.body[0] == (byte) 251文件,暂时不支持

            //field field_eof
            for (;;){
                bin  = new BinaryPacket(this.socket.getInputStream());
                if(bin.body[0] == (byte) 0xff){
                    // error
                    return;
                }

                if(bin.body[0] == (byte)0xfe){
                    //field_eof,break to next for(;;)
                    break;
                }
            }
            //rowData last_eof
            for(;;){
                bin = new BinaryPacket(this.socket.getInputStream());
                if(bin.body[0] == (byte)0xff){
                    //error
                    return;
                }
                //last eof
                if(bin.body[0]== (byte)0xfe){
                    //last_eof
                    return;
                }
            }
        }catch (Exception ex){
            throw new ProxyIoException(ex);
        }finally {
            isHeartBeating.set(false);
        }

    }

}
